Branch data Line data Source code
1 : : /**@file
2 : : * This file is part of the event library; it contains the implementation of the
3 : : * polling event loop functions.
4 : : *
5 : : * @see lely/ev/loop.h
6 : : *
7 : : * @copyright 2018-2019 Lely Industries N.V.
8 : : *
9 : : * @author J. S. Seldenthuis <jseldenthuis@lely.com>
10 : : *
11 : : * Licensed under the Apache License, Version 2.0 (the "License");
12 : : * you may not use this file except in compliance with the License.
13 : : * You may obtain a copy of the License at
14 : : *
15 : : * http://www.apache.org/licenses/LICENSE-2.0
16 : : *
17 : : * Unless required by applicable law or agreed to in writing, software
18 : : * distributed under the License is distributed on an "AS IS" BASIS,
19 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 : : * See the License for the specific language governing permissions and
21 : : * limitations under the License.
22 : : */
23 : :
24 : : #include "ev.h"
25 : :
26 : : #if !LELY_NO_MALLOC
27 : :
28 : : #if !LELY_NO_THREADS
29 : : #include <lely/libc/stdatomic.h>
30 : : #include <lely/libc/threads.h>
31 : : #endif
32 : : #include <lely/ev/exec.h>
33 : : #define LELY_EV_LOOP_INLINE extern inline
34 : : #include <lely/ev/loop.h>
35 : : #include <lely/ev/std_exec.h>
36 : : #include <lely/ev/task.h>
37 : : #include <lely/util/dllist.h>
38 : : #include <lely/util/errnum.h>
39 : : #include <lely/util/time.h>
40 : : #include <lely/util/util.h>
41 : :
42 : : #include <assert.h>
43 : : #include <stdint.h>
44 : : #include <stdlib.h>
45 : :
46 : : #ifndef LELY_EV_LOOP_CTX_MAX_UNUSED
47 : : /// The maximum number of unused contexts per event loop.
48 : : #define LELY_EV_LOOP_CTX_MAX_UNUSED 16
49 : : #endif
50 : :
51 : : /// An event loop context.
52 : : struct ev_loop_ctx {
53 : : /**
54 : : * The number of references to this context. Once the reference count
55 : : * reaches zero, this struct is reclaimed.
56 : : */
57 : : size_t refcnt;
58 : : /// A pointer to the event loop managing this context.
59 : : ev_loop_t *loop;
60 : : /// The future on which the loop is waiting.
61 : : ev_future_t *future;
62 : : /// The task to be executed once the future is ready.
63 : : struct ev_task task;
64 : : /// The address of the stopped flag of the thread.
65 : : int *pstopped;
66 : : #if !LELY_NO_THREADS
67 : : /**
68 : : * The condition variable used by threads to wait for a task to be
69 : : * submitted to the event loop or for the event loop to be stopped or
70 : : * interrupted.
71 : : */
72 : : cnd_t cond;
73 : : /// A flag indicating if a thread is waiting on #cond.
74 : : unsigned waiting : 1;
75 : : #endif
76 : : /// A flag indicating if #future is ready.
77 : : unsigned ready : 1;
78 : : /// A flag indicating if a thread is polling.
79 : : unsigned polling : 1;
80 : : /// The thread identifier of the polling instance.
81 : : void *thr;
82 : : /// The node of this context in the list of waiting or polling contexts.
83 : : struct dlnode node;
84 : : /**
85 : : * A pointer to the next context in the list of running or unused
86 : : * contexts.
87 : : */
88 : : struct ev_loop_ctx *next;
89 : : };
90 : :
91 : : static void ev_loop_ctx_task_func(struct ev_task *task);
92 : :
93 : : static struct ev_loop_ctx *ev_loop_ctx_alloc(void);
94 : : static void ev_loop_ctx_free(struct ev_loop_ctx *ctx);
95 : :
96 : : static void ev_loop_ctx_release(struct ev_loop_ctx *ctx);
97 : :
98 : : static struct ev_loop_ctx *ev_loop_ctx_create(
99 : : ev_loop_t *loop, ev_future_t *future);
100 : : static void ev_loop_ctx_destroy(struct ev_loop_ctx *ctx);
101 : :
102 : : static size_t ev_loop_ctx_wait_one(struct ev_loop_ctx **pctx, ev_loop_t *loop,
103 : : ev_future_t *future);
104 : : static size_t ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx,
105 : : ev_loop_t *loop, ev_future_t *future,
106 : : const struct timespec *abs_time);
107 : :
108 : : static int ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop);
109 : :
110 : : /// An event loop thread.
111 : : struct ev_loop_thrd {
112 : : /// A flag used to interrupt the event loop on this thread.
113 : : int stopped;
114 : : /// A pointer to the event loop context for this thread.
115 : : struct ev_loop_ctx *ctx;
116 : : };
117 : :
118 : : #if LELY_NO_THREADS
119 : : static struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
120 : : #else
121 : : static _Thread_local struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
122 : : #endif
123 : :
124 : : static void ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl);
125 : : static void ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl);
126 : : static void ev_loop_std_exec_impl_post(
127 : : ev_std_exec_impl_t *impl, struct ev_task *task);
128 : : static size_t ev_loop_std_exec_impl_abort(
129 : : ev_std_exec_impl_t *impl, struct ev_task *task);
130 : :
131 : : // clang-format off
132 : : static const struct ev_std_exec_impl_vtbl ev_loop_std_exec_impl_vtbl = {
133 : : &ev_loop_std_exec_impl_on_task_init,
134 : : &ev_loop_std_exec_impl_on_task_fini,
135 : : &ev_loop_std_exec_impl_post,
136 : : &ev_loop_std_exec_impl_abort
137 : : };
138 : : // clang-format on
139 : :
140 : : /// A polling event loop.
141 : : struct ev_loop {
142 : : /// A pointer to the interface used to poll for events (can be `NULL`).
143 : : ev_poll_t *poll;
144 : : /**
145 : : * The number of threads allowed to poll simultaneously. If <b>npoll</b>
146 : : * is 0, there is no limit.
147 : : */
148 : : size_t npoll;
149 : : /**
150 : : * A pointer to the virtual table containing the interface used by the
151 : : * standard executor (#exec).
152 : : */
153 : : const struct ev_std_exec_impl_vtbl *impl_vptr;
154 : : /// The executor corresponding to the event loop.
155 : : struct ev_std_exec exec;
156 : : #if !LELY_NO_THREADS
157 : : /// The mutex protecting the task queue.
158 : : mtx_t mtx;
159 : : #endif
160 : : /// The queue of pending tasks.
161 : : struct sllist queue;
162 : : /// The task used to trigger polling.
163 : : struct ev_task task;
164 : : /**
165 : : * The number of pending tasks. This equals the number tasks in #queue
166 : : * plus the number of calls to ev_exec_on_task_init() minus those to
167 : : * ev_exec_on_task_fini(). ev_loop_stop() is called once this value
168 : : * reaches 0.
169 : : */
170 : : #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
171 : : size_t ntasks;
172 : : #elif _WIN64 && !defined(__MINGW32__)
173 : : volatile LONGLONG ntasks;
174 : : #elif _WIN32 && !defined(__MINGW32__)
175 : : volatile LONG ntasks;
176 : : #else
177 : : atomic_size_t ntasks;
178 : : #endif
179 : : /// A flag specifying whether the event loop is stopped.
180 : : int stopped;
181 : : #if !LELY_NO_THREADS
182 : : /// The list of waiting contexts.
183 : : struct dllist waiting;
184 : : #endif
185 : : /// The list of polling contexts.
186 : : struct dllist polling;
187 : : /// The number of polling contexts.
188 : : size_t npolling;
189 : : /// The list of unused contexts.
190 : : struct ev_loop_ctx *unused;
191 : : /**
192 : : * The number of unused contexts. This WILL NOT exceed
193 : : * #LELY_EV_LOOP_CTX_MAX_UNUSED.
194 : : */
195 : : size_t nunused;
196 : : };
197 : :
198 : : static inline ev_loop_t *ev_loop_from_impl(const ev_std_exec_impl_t *impl);
199 : :
200 : : static int ev_loop_empty(const ev_loop_t *loop);
201 : : static size_t ev_loop_ntasks(const ev_loop_t *loop);
202 : :
203 : : static void ev_loop_do_stop(ev_loop_t *loop);
204 : :
205 : : static int ev_loop_kill_any(ev_loop_t *loop, int polling);
206 : :
207 : : void *
208 : 8 : ev_loop_alloc(void)
209 : : {
210 : 8 : void *ptr = malloc(sizeof(ev_loop_t));
211 : : #if !LELY_NO_ERRNO
212 [ - + ]: 8 : if (!ptr)
213 : 0 : set_errc(errno2c(errno));
214 : : #endif
215 : 8 : return ptr;
216 : : }
217 : :
218 : : void
219 : 8 : ev_loop_free(void *ptr)
220 : : {
221 : 8 : free(ptr);
222 : 8 : }
223 : :
224 : : ev_loop_t *
225 : 8 : ev_loop_init(ev_loop_t *loop, ev_poll_t *poll, size_t npoll, int poll_task)
226 : : {
227 : : assert(loop);
228 : :
229 : 8 : loop->poll = poll;
230 : 8 : loop->npoll = npoll;
231 : :
232 : 8 : loop->impl_vptr = &ev_loop_std_exec_impl_vtbl;
233 : 8 : ev_std_exec_init(ev_loop_get_exec(loop), &loop->impl_vptr);
234 : :
235 : : #if !LELY_NO_THREADS
236 [ - + ]: 8 : if (mtx_init(&loop->mtx, mtx_plain) != thrd_success)
237 : 0 : return NULL;
238 : : #endif
239 : :
240 : 8 : sllist_init(&loop->queue);
241 : :
242 : 8 : loop->task = (struct ev_task)EV_TASK_INIT(NULL, NULL);
243 [ + + - + ]: 8 : if (loop->poll && poll_task)
244 : 0 : sllist_push_back(&loop->queue, &loop->task._node);
245 : :
246 : : #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
247 : : loop->ntasks = 0;
248 : : #else
249 : 8 : atomic_init(&loop->ntasks, 0);
250 : : #endif
251 : :
252 : 8 : loop->stopped = 0;
253 : :
254 : : #if !LELY_NO_THREADS
255 : 8 : dllist_init(&loop->waiting);
256 : : #endif
257 : 8 : dllist_init(&loop->polling);
258 : 8 : loop->npolling = 0;
259 : :
260 : 8 : loop->unused = NULL;
261 : 8 : loop->nunused = 0;
262 : :
263 : 8 : return loop;
264 : : }
265 : :
266 : : void
267 : 8 : ev_loop_fini(ev_loop_t *loop)
268 : : {
269 : : assert(loop);
270 : :
271 [ + + ]: 13 : while (loop->unused) {
272 : 5 : struct ev_loop_ctx *ctx = loop->unused;
273 : 5 : loop->unused = ctx->next;
274 : 5 : ev_loop_ctx_free(ctx);
275 : : }
276 : :
277 : : assert(!loop->npolling);
278 : : assert(dllist_empty(&loop->polling));
279 : : #if !LELY_NO_THREADS
280 : : assert(dllist_empty(&loop->waiting));
281 : : #endif
282 : :
283 : : #if !LELY_NO_THREADS
284 : 8 : mtx_destroy(&loop->mtx);
285 : : #endif
286 : :
287 : 8 : ev_std_exec_fini(ev_loop_get_exec(loop));
288 : 8 : }
289 : :
290 : : ev_loop_t *
291 : 8 : ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
292 : : {
293 : 8 : int errc = 0;
294 : :
295 : 8 : ev_loop_t *loop = ev_loop_alloc();
296 [ - + ]: 8 : if (!loop) {
297 : 0 : errc = get_errc();
298 : 0 : goto error_alloc;
299 : : }
300 : :
301 : 8 : ev_loop_t *tmp = ev_loop_init(loop, poll, npoll, poll_task);
302 [ - + ]: 8 : if (!tmp) {
303 : 0 : errc = get_errc();
304 : 0 : goto error_init;
305 : : }
306 : 8 : loop = tmp;
307 : :
308 : 8 : return loop;
309 : :
310 : 0 : error_init:
311 : 0 : ev_loop_free(loop);
312 : 0 : error_alloc:
313 : 0 : set_errc(errc);
314 : 0 : return NULL;
315 : : }
316 : :
317 : : void
318 : 8 : ev_loop_destroy(ev_loop_t *loop)
319 : : {
320 [ + - ]: 8 : if (loop) {
321 : 8 : ev_loop_fini(loop);
322 : 8 : ev_loop_free(loop);
323 : : }
324 : 8 : }
325 : :
326 : : ev_poll_t *
327 : 0 : ev_loop_get_poll(const ev_loop_t *loop)
328 : : {
329 : : assert(loop);
330 : :
331 : 0 : return loop->poll;
332 : : }
333 : :
334 : : ev_exec_t *
335 : 32 : ev_loop_get_exec(const ev_loop_t *loop)
336 : : {
337 : : assert(loop);
338 : :
339 : 32 : return &loop->exec.exec_vptr;
340 : : }
341 : :
342 : : void
343 : 0 : ev_loop_stop(ev_loop_t *loop)
344 : : {
345 : : assert(loop);
346 : :
347 : : #if !LELY_NO_THREADS
348 : 0 : mtx_lock(&loop->mtx);
349 : : #endif
350 : 0 : ev_loop_do_stop(loop);
351 : : #if !LELY_NO_THREADS
352 : 0 : mtx_unlock(&loop->mtx);
353 : : #endif
354 : 0 : }
355 : :
356 : : int
357 : 7 : ev_loop_stopped(const ev_loop_t *loop)
358 : : {
359 : : assert(loop);
360 : :
361 : : #if !LELY_NO_THREADS
362 : 7 : mtx_lock((mtx_t *)&loop->mtx);
363 : : #endif
364 : 7 : int stopped = loop->stopped;
365 : : #if !LELY_NO_THREADS
366 : 7 : mtx_unlock((mtx_t *)&loop->mtx);
367 : : #endif
368 : 7 : return stopped;
369 : : }
370 : :
371 : : void
372 : 0 : ev_loop_restart(ev_loop_t *loop)
373 : : {
374 : : assert(loop);
375 : :
376 : : #if !LELY_NO_THREADS
377 : 0 : mtx_lock(&loop->mtx);
378 : : #endif
379 : 0 : loop->stopped = 0;
380 : : #if !LELY_NO_THREADS
381 : 0 : mtx_unlock(&loop->mtx);
382 : : #endif
383 : 0 : }
384 : :
385 : : size_t
386 : 8 : ev_loop_wait(ev_loop_t *loop, ev_future_t *future)
387 : : {
388 : 8 : size_t n = 0;
389 : 8 : struct ev_loop_ctx *ctx = NULL;
390 [ + + ]: 16782364 : while (ev_loop_ctx_wait_one(&ctx, loop, future))
391 : 16782356 : n += n < SIZE_MAX;
392 : 8 : ev_loop_ctx_destroy(ctx);
393 : 8 : return n;
394 : : }
395 : :
396 : : size_t
397 : 0 : ev_loop_wait_until(ev_loop_t *loop, ev_future_t *future,
398 : : const struct timespec *abs_time)
399 : : {
400 : 0 : size_t n = 0;
401 : 0 : struct ev_loop_ctx *ctx = NULL;
402 [ # # ]: 0 : while (ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time))
403 : 0 : n += n < SIZE_MAX;
404 : 0 : ev_loop_ctx_destroy(ctx);
405 : 0 : return n;
406 : : }
407 : :
408 : : size_t
409 : 0 : ev_loop_wait_one(ev_loop_t *loop, ev_future_t *future)
410 : : {
411 : 0 : struct ev_loop_ctx *ctx = NULL;
412 : 0 : size_t n = ev_loop_ctx_wait_one(&ctx, loop, future);
413 : 0 : ev_loop_ctx_destroy(ctx);
414 : 0 : return n;
415 : : }
416 : :
417 : : size_t
418 : 0 : ev_loop_wait_one_until(ev_loop_t *loop, ev_future_t *future,
419 : : const struct timespec *abs_time)
420 : : {
421 : 0 : struct ev_loop_ctx *ctx = NULL;
422 : 0 : size_t n = ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time);
423 : 0 : ev_loop_ctx_destroy(ctx);
424 : 0 : return n;
425 : : }
426 : :
427 : : void *
428 : 0 : ev_loop_self(void)
429 : : {
430 : 0 : return &ev_loop_thrd;
431 : : }
432 : :
433 : : int
434 : 0 : ev_loop_kill(ev_loop_t *loop, void *thr_)
435 : : {
436 : : #if LELY_NO_THREADS
437 : : (void)loop;
438 : : #else
439 : : assert(loop);
440 : : #endif
441 : 0 : struct ev_loop_thrd *thr = thr_;
442 : : assert(thr);
443 : :
444 : 0 : int result = 0;
445 : 0 : int errc = get_errc();
446 : : #if !LELY_NO_THREADS
447 : 0 : mtx_lock(&loop->mtx);
448 : : #endif
449 [ # # ]: 0 : if (!thr->stopped) {
450 [ # # ]: 0 : if (thr->ctx) {
451 [ # # ]: 0 : if ((result = ev_loop_ctx_kill(thr->ctx, 1)) == -1)
452 : 0 : errc = get_errc();
453 : : } else {
454 : 0 : thr->stopped = 1;
455 : : }
456 : : }
457 : : #if !LELY_NO_THREADS
458 : 0 : mtx_unlock(&loop->mtx);
459 : : #endif
460 : 0 : set_errc(errc);
461 : 0 : return result;
462 : : }
463 : :
464 : : static void
465 : 0 : ev_loop_ctx_task_func(struct ev_task *task)
466 : : {
467 : : assert(task);
468 : 0 : struct ev_loop_ctx *ctx = structof(task, struct ev_loop_ctx, task);
469 : 0 : ev_loop_t *loop = ctx->loop;
470 : : assert(loop);
471 : : assert(ctx->future);
472 : :
473 : : #if LELY_NO_THREADS
474 : : (void)loop;
475 : : #else
476 : 0 : mtx_lock(&loop->mtx);
477 : : #endif
478 [ # # # # : 0 : if (ctx->refcnt > 1 && !*ctx->pstopped && !ctx->ready) {
# # ]
479 : 0 : ctx->ready = 1;
480 : 0 : ev_loop_ctx_kill(ctx, 0);
481 : : }
482 : : #if !LELY_NO_THREADS
483 : 0 : mtx_unlock(&loop->mtx);
484 : : #endif
485 : 0 : ev_loop_ctx_release(ctx);
486 : 0 : }
487 : :
488 : : static struct ev_loop_ctx *
489 : 5 : ev_loop_ctx_alloc(void)
490 : : {
491 : 5 : int errc = 0;
492 : :
493 : 5 : struct ev_loop_ctx *ctx = malloc(sizeof(*ctx));
494 [ - + ]: 5 : if (!ctx) {
495 : : #if !LELY_NO_ERRNO
496 : 0 : errc = errno2c(errno);
497 : : #endif
498 : 0 : goto error_malloc_ctx;
499 : : }
500 : :
501 : 5 : ctx->refcnt = 0;
502 : :
503 : 5 : ctx->loop = NULL;
504 : :
505 : 5 : ctx->future = NULL;
506 : 5 : ctx->task = (struct ev_task)EV_TASK_INIT(NULL, &ev_loop_ctx_task_func);
507 : :
508 : 5 : ctx->pstopped = NULL;
509 : : #if !LELY_NO_THREADS
510 [ - + ]: 5 : if (cnd_init(&ctx->cond) != thrd_success) {
511 : 0 : errc = get_errc();
512 : 0 : goto error_init_cond;
513 : : }
514 : 5 : ctx->waiting = 0;
515 : : #endif
516 : 5 : ctx->ready = 0;
517 : 5 : ctx->polling = 0;
518 : 5 : ctx->thr = NULL;
519 : :
520 : 5 : dlnode_init(&ctx->node);
521 : 5 : ctx->next = NULL;
522 : :
523 : 5 : return ctx;
524 : :
525 : : #if !LELY_NO_THREADS
526 : : // cnd_destroy(&ctx->cond);
527 : 0 : error_init_cond:
528 : : #endif
529 : 0 : free(ctx);
530 : 0 : error_malloc_ctx:
531 : 0 : set_errc(errc);
532 : 0 : return NULL;
533 : : }
534 : :
535 : : static void
536 : 5 : ev_loop_ctx_free(struct ev_loop_ctx *ctx)
537 : : {
538 [ + - ]: 5 : if (ctx) {
539 : : #if !LELY_NO_THREADS
540 : 5 : cnd_destroy(&ctx->cond);
541 : : #endif
542 : 5 : free(ctx);
543 : : }
544 : 5 : }
545 : :
546 : : static void
547 : 5 : ev_loop_ctx_release(struct ev_loop_ctx *ctx)
548 : : {
549 : : assert(ctx);
550 : 5 : ev_loop_t *loop = ctx->loop;
551 : : assert(loop);
552 : :
553 : : #if !LELY_NO_THREADS
554 : 5 : mtx_lock(&loop->mtx);
555 : : #endif
556 [ - + ]: 5 : if (--ctx->refcnt) {
557 : : #if !LELY_NO_THREADS
558 : 0 : mtx_unlock(&loop->mtx);
559 : : #endif
560 : 0 : return;
561 : : }
562 : 5 : ev_future_t *future = ctx->future;
563 : 5 : ctx->future = NULL;
564 : : #if !LELY_NO_THREADS
565 : : assert(!ctx->waiting);
566 : : #endif
567 : : assert(!ctx->polling);
568 [ + - ]: 5 : if (loop->nunused < LELY_EV_LOOP_CTX_MAX_UNUSED) {
569 : 5 : ctx->next = loop->unused;
570 : 5 : loop->unused = ctx;
571 : 5 : loop->nunused++;
572 : : #if !LELY_NO_THREADS
573 : 5 : mtx_unlock(&loop->mtx);
574 : : #endif
575 : 5 : ev_future_release(future);
576 : : } else {
577 : : #if !LELY_NO_THREADS
578 : 0 : mtx_unlock(&loop->mtx);
579 : : #endif
580 : 0 : ev_future_release(future);
581 : 0 : ev_loop_ctx_free(ctx);
582 : : }
583 : : }
584 : :
585 : : static struct ev_loop_ctx *
586 : 5 : ev_loop_ctx_create(ev_loop_t *loop, ev_future_t *future)
587 : : {
588 : : assert(loop);
589 : :
590 : : #if !LELY_NO_THREADS
591 : 5 : mtx_lock(&loop->mtx);
592 : : #endif
593 : 5 : struct ev_loop_ctx *ctx = loop->unused;
594 [ - + ]: 5 : if (ctx) {
595 : 0 : loop->unused = ctx->next;
596 : : assert(loop->nunused);
597 : 0 : loop->nunused--;
598 : : #if !LELY_NO_THREADS
599 : 0 : mtx_unlock(&loop->mtx);
600 : : #endif
601 : : } else {
602 : : #if !LELY_NO_THREADS
603 : 5 : mtx_unlock(&loop->mtx);
604 : : #endif
605 : 5 : ctx = ev_loop_ctx_alloc();
606 [ - + ]: 5 : if (!ctx)
607 : 0 : return NULL;
608 : : }
609 : :
610 : : assert(!ctx->refcnt);
611 : 5 : ctx->refcnt++;
612 : :
613 : 5 : ctx->loop = loop;
614 : :
615 : 5 : ctx->future = ev_future_acquire(future);
616 : 5 : ctx->task = (struct ev_task)EV_TASK_INIT(
617 : : ev_loop_get_exec(loop), &ev_loop_ctx_task_func);
618 : :
619 : 5 : ctx->pstopped = &ev_loop_thrd.stopped;
620 : :
621 : : #if !LELY_NO_THREADS
622 : : assert(!ctx->waiting);
623 : : #endif
624 : 5 : ctx->ready = 0;
625 : : assert(!ctx->polling);
626 [ + - ]: 5 : ctx->thr = loop->poll ? ev_poll_self(loop->poll) : NULL;
627 : :
628 : 5 : dlnode_init(&ctx->node);
629 : :
630 : 5 : ctx->next = ev_loop_thrd.ctx;
631 : 5 : ev_loop_thrd.ctx = ctx;
632 : :
633 [ - + ]: 5 : if (ctx->future) {
634 : 0 : ctx->refcnt++;
635 : 0 : ev_future_submit(ctx->future, &ctx->task);
636 : : }
637 : :
638 : 5 : return ctx;
639 : : }
640 : :
641 : : static void
642 : 8 : ev_loop_ctx_destroy(struct ev_loop_ctx *ctx)
643 : : {
644 [ + + ]: 8 : if (ctx) {
645 [ - + ]: 5 : if (ctx->future)
646 : 0 : ev_future_cancel(ctx->future, &ctx->task);
647 : :
648 : : assert(ev_loop_thrd.ctx == ctx);
649 : 5 : ev_loop_thrd.ctx = ctx->next;
650 : :
651 : 5 : ev_loop_ctx_release(ctx);
652 : : }
653 : 8 : }
654 : :
655 : : static inline int
656 : 312 : ev_loop_can_poll(ev_loop_t *loop)
657 : : {
658 [ + - + - : 312 : return loop->poll && (!loop->npoll || loop->npolling < loop->npoll);
+ - ]
659 : : }
660 : :
661 : : static size_t
662 : 16782364 : ev_loop_ctx_wait_one(
663 : : struct ev_loop_ctx **pctx, ev_loop_t *loop, ev_future_t *future)
664 : : {
665 : : assert(pctx);
666 : 16782364 : struct ev_loop_ctx *ctx = *pctx;
667 : : assert(loop);
668 : : assert(!ctx || ctx->loop == loop);
669 : :
670 : 16782364 : size_t n = 0;
671 : : #if !LELY_NO_THREADS
672 : 16782364 : mtx_lock(&loop->mtx);
673 : : #endif
674 : 16782364 : int poll_task = 0;
675 [ + + + + : 16782688 : while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
+ - + - ]
676 : : // Stop the event loop if no more tasks remain or have been
677 : : // announced with ev_exec_on_task_init(), unless we should be
678 : : // waiting on a future but have not created the event loop
679 : : // context yet.
680 [ + + + + ]: 16782680 : if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
681 [ - + - - ]: 7 : && (!future || ctx)) {
682 : 7 : ev_loop_do_stop(loop);
683 : 7 : continue;
684 : : }
685 : 16782673 : struct ev_task *task = ev_task_from_node(
686 : : sllist_pop_front(&loop->queue));
687 [ + + - + ]: 16782673 : if (task && task == &loop->task) {
688 : : // The polling task is not a real task, but is part of
689 : : // the task queue for scheduling purposes.
690 : 0 : poll_task = 1;
691 : 0 : task = NULL;
692 : : }
693 [ + + ]: 16782673 : if (task) {
694 : : // If a real task is available, execute it and return.
695 : : #if !LELY_NO_THREADS
696 : 16782356 : mtx_unlock(&loop->mtx);
697 : : #endif
698 : : assert(task->exec);
699 : 16782356 : ev_exec_run(task->exec, task);
700 : 16782356 : n++;
701 : : #if !LELY_NO_THREADS
702 : 16782356 : mtx_lock(&loop->mtx);
703 : : #endif
704 : 16782356 : break;
705 : : }
706 [ + + ]: 317 : if (!ctx) {
707 : : // Only create an event loop context when we have to
708 : : // poll or wait.
709 : : #if !LELY_NO_THREADS
710 : 5 : mtx_unlock(&loop->mtx);
711 : : #endif
712 : 5 : ctx = *pctx = ev_loop_ctx_create(loop, future);
713 [ - + ]: 5 : if (!ctx) {
714 : : #if !LELY_NO_THREADS
715 : 0 : mtx_lock(&loop->mtx);
716 : : #endif
717 : 0 : break;
718 : : }
719 : : #if !LELY_NO_THREADS
720 : 5 : mtx_lock(&loop->mtx);
721 : : #endif
722 : : // We released the lock, so a task may have been queued.
723 : 5 : continue;
724 : : }
725 [ + - ]: 312 : if (ev_loop_can_poll(loop)) {
726 : 312 : ctx->polling = 1;
727 : : // Wake polling threads in LIFO order.
728 : 312 : dllist_push_front(&loop->polling, &ctx->node);
729 : 312 : loop->npolling++;
730 : 312 : int empty = sllist_empty(&loop->queue);
731 : : #if !LELY_NO_THREADS
732 : 312 : mtx_unlock(&loop->mtx);
733 : : #endif
734 [ + - ]: 312 : int result = ev_poll_wait(loop->poll, empty ? -1 : 0);
735 : : #if !LELY_NO_THREADS
736 : 312 : mtx_lock(&loop->mtx);
737 : : #endif
738 : 312 : loop->npolling--;
739 : 312 : dllist_remove(&loop->polling, &ctx->node);
740 : 312 : ctx->polling = 0;
741 [ - + ]: 312 : if (result == -1)
742 : 0 : break;
743 [ # # ]: 0 : } else if (!sllist_empty(&loop->queue)) {
744 : 0 : continue;
745 : : } else {
746 : : #if LELY_NO_THREADS
747 : : break;
748 : : #else // !LELY_NO_THREADS
749 : 0 : ctx->waiting = 1;
750 : : // Wake waiting threads in LIFO order.
751 : 0 : dllist_push_front(&loop->waiting, &ctx->node);
752 : 0 : int result = cnd_wait(&ctx->cond, &loop->mtx);
753 : 0 : dllist_remove(&loop->waiting, &ctx->node);
754 : 0 : ctx->waiting = 0;
755 [ # # ]: 0 : if (result != thrd_success)
756 : 0 : break;
757 : : #endif // !LELY_NO_THREADS
758 : : }
759 : : }
760 : 16782364 : int empty = sllist_empty(&loop->queue);
761 [ - + ]: 16782364 : if (poll_task)
762 : : // Requeue the polling task.
763 : 0 : sllist_push_back(&loop->queue, &loop->task._node);
764 [ + + ]: 16782364 : if (!empty)
765 : : // If any real tasks remain on the queue, wake up any polling or
766 : : // non-polling thread.
767 : 16782039 : ev_loop_kill_any(loop, 1);
768 [ - + ]: 325 : else if (poll_task)
769 : : // Wake up any non-polling thread so it can start polling.
770 : 0 : ev_loop_kill_any(loop, 0);
771 [ + + + + : 16782364 : if (!n && ctx && *ctx->pstopped)
- + ]
772 : : // Reset the thread-local flag used to stop an event loop with
773 : : // ev_loop_kill(), so it will resume on the next run funciton.
774 : 0 : *ctx->pstopped = 0;
775 : : #if !LELY_NO_THREADS
776 : 16782364 : mtx_unlock(&loop->mtx);
777 : : #endif
778 : 16782364 : return n;
779 : : }
780 : :
781 : : static size_t
782 : 0 : ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx, ev_loop_t *loop,
783 : : ev_future_t *future, const struct timespec *abs_time)
784 : : {
785 : : assert(pctx);
786 : 0 : struct ev_loop_ctx *ctx = *pctx;
787 : : assert(loop);
788 : : assert(!ctx || ctx->loop == loop);
789 : : #if LELY_NO_THREADS && LELY_NO_TIMEOUT
790 : : (void)abs_time;
791 : : #endif
792 : :
793 : 0 : size_t n = 0;
794 : : #if !LELY_NO_THREADS
795 : 0 : mtx_lock(&loop->mtx);
796 : : #endif
797 : 0 : int poll_task = 0;
798 [ # # # # : 0 : while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
# # # # ]
799 : : // Stop the event loop if no more tasks remain or have been
800 : : // announced with ev_exec_on_task_init(), unless we should be
801 : : // waiting on a future but have not created the event loop
802 : : // context yet.
803 [ # # # # ]: 0 : if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
804 [ # # # # ]: 0 : && (!future || ctx)) {
805 : 0 : ev_loop_do_stop(loop);
806 : 0 : continue;
807 : : }
808 : 0 : struct ev_task *task = ev_task_from_node(
809 : : sllist_pop_front(&loop->queue));
810 [ # # # # ]: 0 : if (task && task == &loop->task) {
811 : : // The polling task is not a real task, but is part of
812 : : // the task queue for scheduling purposes.
813 : 0 : poll_task = 1;
814 : 0 : task = NULL;
815 : : }
816 [ # # ]: 0 : if (task) {
817 : : // If a real task is available, execute it and return.
818 : : #if !LELY_NO_THREADS
819 : 0 : mtx_unlock(&loop->mtx);
820 : : #endif
821 : : assert(task->exec);
822 : 0 : ev_exec_run(task->exec, task);
823 : 0 : n++;
824 : : #if !LELY_NO_THREADS
825 : 0 : mtx_lock(&loop->mtx);
826 : : #endif
827 : 0 : break;
828 : : }
829 [ # # ]: 0 : if (!ctx) {
830 : : // Only create an event loop context when we have to
831 : : // poll or wait.
832 : : #if !LELY_NO_THREADS
833 : 0 : mtx_unlock(&loop->mtx);
834 : : #endif
835 : 0 : ctx = *pctx = ev_loop_ctx_create(loop, future);
836 [ # # ]: 0 : if (!ctx) {
837 : : #if !LELY_NO_THREADS
838 : 0 : mtx_lock(&loop->mtx);
839 : : #endif
840 : 0 : break;
841 : : }
842 : : #if !LELY_NO_THREADS
843 : 0 : mtx_lock(&loop->mtx);
844 : : #endif
845 : : // We released the lock, so a task may have been queued.
846 : 0 : continue;
847 : : }
848 [ # # ]: 0 : if (ev_loop_can_poll(loop)) {
849 : 0 : ctx->polling = 1;
850 : : // Wake polling threads in LIFO order.
851 : 0 : dllist_push_front(&loop->polling, &ctx->node);
852 : 0 : loop->npolling++;
853 : : #if !LELY_NO_TIMEOUT
854 : 0 : int empty = sllist_empty(&loop->queue);
855 : : #endif
856 : : #if !LELY_NO_THREADS
857 : 0 : mtx_unlock(&loop->mtx);
858 : : #endif
859 : 0 : int result = -1;
860 : 0 : int64_t msec = 0;
861 : : #if !LELY_NO_TIMEOUT
862 [ # # # # ]: 0 : if (empty && abs_time) {
863 : 0 : struct timespec now = { 0, 0 };
864 [ # # ]: 0 : if (!timespec_get(&now, TIME_UTC))
865 : 0 : goto error;
866 [ # # ]: 0 : if (timespec_cmp(abs_time, &now) <= 0) {
867 : 0 : set_errnum(ERRNUM_TIMEDOUT);
868 : 0 : goto error;
869 : : }
870 : 0 : msec = timespec_diff_msec(abs_time, &now);
871 [ # # ]: 0 : if (!msec)
872 : : // Prevent a busy loop due to rounding.
873 : 0 : msec = 1;
874 [ # # ]: 0 : else if (msec > INT_MAX)
875 : 0 : msec = INT_MAX;
876 : : }
877 : : #endif // !LELY_NO_TIMEOUT
878 : 0 : result = ev_poll_wait(loop->poll, msec);
879 : : #if !LELY_NO_TIMEOUT
880 : 0 : error:
881 : : #endif
882 : : #if !LELY_NO_THREADS
883 : 0 : mtx_lock(&loop->mtx);
884 : : #endif
885 : 0 : loop->npolling--;
886 : 0 : dllist_remove(&loop->polling, &ctx->node);
887 : 0 : ctx->polling = 0;
888 [ # # ]: 0 : if (result == -1)
889 : 0 : break;
890 [ # # ]: 0 : } else if (!sllist_empty(&loop->queue)) {
891 : 0 : continue;
892 : : #if !LELY_NO_THREADS && !LELY_NO_TIMEOUT
893 [ # # ]: 0 : } else if (abs_time) {
894 : 0 : ctx->waiting = 1;
895 : : // Wake waiting threads in LIFO order.
896 : 0 : dllist_push_front(&loop->waiting, &ctx->node);
897 : 0 : int result = cnd_timedwait(
898 : : &ctx->cond, &loop->mtx, abs_time);
899 : 0 : dllist_remove(&loop->waiting, &ctx->node);
900 : 0 : ctx->waiting = 0;
901 [ # # ]: 0 : if (result != thrd_success) {
902 [ # # ]: 0 : if (result == thrd_timedout)
903 : 0 : set_errnum(ERRNUM_TIMEDOUT);
904 : 0 : break;
905 : : }
906 : : #endif // !LELY_NO_THREADS
907 : : } else {
908 : 0 : break;
909 : : }
910 : : }
911 : 0 : int empty = sllist_empty(&loop->queue);
912 [ # # ]: 0 : if (poll_task)
913 : : // Requeue the polling task.
914 : 0 : sllist_push_back(&loop->queue, &loop->task._node);
915 [ # # ]: 0 : if (!empty)
916 : : // If any real tasks remain on the queue, wake up any polling or
917 : : // non-polling thread.
918 : 0 : ev_loop_kill_any(loop, 1);
919 [ # # ]: 0 : else if (poll_task)
920 : : // Wake up any non-polling thread so it can start polling.
921 : 0 : ev_loop_kill_any(loop, 0);
922 [ # # # # : 0 : if (!n && ctx && *ctx->pstopped)
# # ]
923 : : // Reset the thread-local flag used to stop an event loop with
924 : : // ev_loop_kill(), so it will resume on the next run funciton.
925 : 0 : *ctx->pstopped = 0;
926 : : #if !LELY_NO_THREADS
927 : 0 : mtx_unlock(&loop->mtx);
928 : : #endif
929 : 0 : return n;
930 : : }
931 : :
932 : : static int
933 : 312 : ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop)
934 : : {
935 : : assert(ctx);
936 : : assert(ctx->loop);
937 : : assert(ctx->pstopped);
938 : : assert(!ctx->polling || ctx->loop->poll);
939 : :
940 [ - + ]: 312 : if (*ctx->pstopped)
941 : 0 : return 0;
942 [ - + ]: 312 : else if (stop)
943 : 0 : *ctx->pstopped = 1;
944 : :
945 : : #if !LELY_NO_THREADS
946 [ - + ]: 312 : if (ctx->waiting) {
947 : 0 : cnd_signal(&ctx->cond);
948 : 0 : return 0;
949 : : }
950 : : #endif
951 [ + - ]: 312 : return ctx->polling ? ev_poll_kill(ctx->loop->poll, ctx->thr) : 0;
952 : : }
953 : :
954 : : static void
955 : 3490 : ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl)
956 : : {
957 : 3490 : ev_loop_t *loop = ev_loop_from_impl(impl);
958 : :
959 : : #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
960 : : loop->ntasks++;
961 : : #elif _WIN64 && !defined(__MINGW32__)
962 : : InterlockedIncrementNoFence64(&loop->ntasks);
963 : : #elif _WIN32 && !defined(__MINGW32__)
964 : : InterlockedIncrementNoFence(&loop->ntasks);
965 : : #else
966 : 3490 : atomic_fetch_add_explicit(&loop->ntasks, 1, memory_order_relaxed);
967 : : #endif
968 : 3490 : }
969 : :
970 : : static void
971 : 3490 : ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl)
972 : : {
973 : 3490 : ev_loop_t *loop = ev_loop_from_impl(impl);
974 : :
975 : : #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
976 : : if (!--loop->ntasks) {
977 : : #elif _WIN64 && !defined(__MINGW32__)
978 : : if (!InterlockedDecrementRelease64(&loop->ntasks)) {
979 : : MemoryBarrier();
980 : : #elif _WIN32 && !defined(__MINGW32__)
981 : : if (!InterlockedDecrementRelease(&loop->ntasks)) {
982 : : MemoryBarrier();
983 : : #else
984 [ + + ]: 3490 : if (atomic_fetch_sub_explicit(&loop->ntasks, 1, memory_order_release)
985 : : == 1) {
986 : 19 : atomic_thread_fence(memory_order_acquire);
987 : : #endif
988 : : #if !LELY_NO_THREADS
989 : 19 : mtx_lock(&loop->mtx);
990 : : #endif
991 [ + + ]: 19 : if (ev_loop_empty(loop))
992 : 1 : ev_loop_do_stop(loop);
993 : : #if !LELY_NO_THREADS
994 : 19 : mtx_unlock(&loop->mtx);
995 : : #endif
996 : : }
997 : 3490 : }
998 : :
999 : : static void
1000 : 16782357 : ev_loop_std_exec_impl_post(ev_std_exec_impl_t *impl, struct ev_task *task)
1001 : : {
1002 : 16782357 : ev_loop_t *loop = ev_loop_from_impl(impl);
1003 : : assert(task);
1004 : :
1005 : : #if !LELY_NO_THREADS
1006 : 16782357 : mtx_lock(&loop->mtx);
1007 : : #endif
1008 : 16782357 : int empty = sllist_empty(&loop->queue);
1009 : 16782357 : sllist_push_back(&loop->queue, &task->_node);
1010 [ + + ]: 16782357 : if (empty)
1011 : 16778893 : ev_loop_kill_any(loop, 1);
1012 : : #if !LELY_NO_THREADS
1013 : 16782357 : mtx_unlock(&loop->mtx);
1014 : : #endif
1015 : 16782357 : }
1016 : :
1017 : : static size_t
1018 : 1 : ev_loop_std_exec_impl_abort(ev_std_exec_impl_t *impl, struct ev_task *task)
1019 : : {
1020 : 1 : ev_loop_t *loop = ev_loop_from_impl(impl);
1021 : :
1022 : : struct sllist queue;
1023 : 1 : sllist_init(&queue);
1024 : :
1025 : : #if !LELY_NO_THREADS
1026 : 1 : mtx_lock(&loop->mtx);
1027 : : #endif
1028 [ - + ]: 1 : if (!task) {
1029 : 0 : int poll_task = 0;
1030 : : struct slnode *node;
1031 [ # # ]: 0 : while ((node = sllist_pop_front(&loop->queue))) {
1032 [ # # ]: 0 : if (ev_task_from_node(node) == &loop->task)
1033 : 0 : poll_task = 1;
1034 : : else
1035 : 0 : sllist_push_back(&queue, node);
1036 : : }
1037 [ # # ]: 0 : if (poll_task)
1038 : 0 : sllist_push_back(&loop->queue, &loop->task._node);
1039 [ + - ]: 1 : } else if (sllist_remove(&loop->queue, &task->_node)) {
1040 : 1 : sllist_push_back(&queue, &task->_node);
1041 : : }
1042 : : #if !LELY_NO_THREADS
1043 : 1 : mtx_unlock(&loop->mtx);
1044 : : #endif
1045 : :
1046 : 1 : size_t n = 0;
1047 [ + + ]: 2 : while (sllist_pop_front(&queue))
1048 : 1 : n += n < SIZE_MAX;
1049 : 1 : return n;
1050 : : }
1051 : :
1052 : : static inline ev_loop_t *
1053 : 16789338 : ev_loop_from_impl(const ev_std_exec_impl_t *impl)
1054 : : {
1055 : : assert(impl);
1056 : :
1057 : 16789338 : return structof(impl, ev_loop_t, impl_vptr);
1058 : : }
1059 : :
1060 : : static int
1061 : 16782699 : ev_loop_empty(const ev_loop_t *loop)
1062 : : {
1063 : : assert(loop);
1064 : :
1065 [ + + ]: 16782699 : if (sllist_empty(&loop->queue))
1066 : 325 : return 1;
1067 : 16782374 : struct slnode *node = sllist_first(&loop->queue);
1068 : : // The task queue is considered empty if only the polling task remains.
1069 [ - + - - ]: 16782374 : return node == &loop->task._node && node->next == NULL;
1070 : : }
1071 : :
1072 : : static size_t
1073 : 324 : ev_loop_ntasks(const ev_loop_t *loop)
1074 : : {
1075 : : assert(loop);
1076 : :
1077 : : #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
1078 : : return loop->ntasks;
1079 : : #else
1080 : 324 : return atomic_load_explicit(
1081 : : (atomic_size_t *)&loop->ntasks, memory_order_relaxed);
1082 : : #endif
1083 : : }
1084 : :
1085 : : static void
1086 : 8 : ev_loop_do_stop(ev_loop_t *loop)
1087 : : {
1088 : : assert(loop);
1089 : :
1090 [ + - ]: 8 : if (!loop->stopped) {
1091 : 8 : loop->stopped = 1;
1092 : : #if !LELY_NO_THREADS
1093 [ - + - - : 8 : dllist_foreach (&loop->waiting, node) {
- + ]
1094 : 0 : struct ev_loop_ctx *ctx = structof(
1095 : : node, struct ev_loop_ctx, node);
1096 : 0 : ev_loop_ctx_kill(ctx, 1);
1097 : : }
1098 : : #endif
1099 [ - + - - : 8 : dllist_foreach (&loop->polling, node) {
- + ]
1100 : 0 : struct ev_loop_ctx *ctx = structof(
1101 : : node, struct ev_loop_ctx, node);
1102 : 0 : ev_loop_ctx_kill(ctx, 1);
1103 : : }
1104 : : }
1105 : 8 : }
1106 : :
1107 : : static int
1108 : 33560932 : ev_loop_kill_any(ev_loop_t *loop, int polling)
1109 : : {
1110 : : assert(loop);
1111 : :
1112 : : struct dlnode *node;
1113 : : #if !LELY_NO_THREADS
1114 [ - + ]: 33560932 : if ((node = dllist_first(&loop->waiting))) {
1115 : 0 : struct ev_loop_ctx *ctx =
1116 : 0 : structof(node, struct ev_loop_ctx, node);
1117 : 0 : return ev_loop_ctx_kill(ctx, 0);
1118 : : }
1119 : : #endif
1120 [ + - + + ]: 33560932 : if (polling && (node = dllist_first(&loop->polling))) {
1121 : 312 : struct ev_loop_ctx *ctx =
1122 : 312 : structof(node, struct ev_loop_ctx, node);
1123 : 312 : return ev_loop_ctx_kill(ctx, 0);
1124 : : }
1125 : 33560620 : return 0;
1126 : : }
1127 : :
1128 : : #endif // !LELY_NO_MALLOC
|